00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _ga_shuffler_hpp_
00022 #define _ga_shuffler_hpp_
00023
00024 #include <string>
00025 #include <cstring>
00026 #include <sstream>
00027 #include <vector>
00028
00029 #include <boost/assert.hpp>
00030 #include <boost/serialization/vector.hpp>
00031 #include <boost/archive/binary_oarchive.hpp>
00032 #include <boost/archive/binary_iarchive.hpp>
00033
00034 #include <ga++.h>
00035
00036 #include <gridpack/utilities/uncopyable.hpp>
00037 #include <gridpack/parallel/distributed.hpp>
00038
00039 namespace gridpack {
00040 namespace parallel {
00041
00042
00043
00044
00045 template <typename Thing, typename Index = int>
00046 class gaShuffler
00047 : public Distributed,
00048 private utility::Uncopyable
00049 {
00050 public:
00051
00052
00053
00054
00055 typedef std::vector<Thing> ThingVector;
00056 typedef std::vector<Index> IndexVector;
00057
00058
00059 typedef boost::archive::binary_oarchive oArchive;
00060 typedef boost::archive::binary_iarchive iArchive;
00061
00062
00063 gaShuffler(const Communicator& comm)
00064 : Distributed(comm), utility::Uncopyable(),
00065 p_gaBuffers(processor_size()),
00066 p_gaBufferSize(processor_size())
00067 {}
00068
00069
00070 ~gaShuffler(void)
00071 {}
00072
00073
00074 void operator() (ThingVector& locthings, const IndexVector& destproc)
00075 {
00076 BOOST_ASSERT(locthings.size() == destproc.size());
00077
00078 int me(processor_rank());
00079 int nproc(processor_size());
00080
00081 if (nproc <= 1) return;
00082
00083 size_t nthings(0);
00084 boost::mpi::all_reduce(this->communicator(), locthings.size(), nthings,
00085 std::plus<size_t>());
00086 if (nthings <= 0) return;
00087
00088 int theGAgroup(communicator().getGroup());
00089 int oldGAgroup = GA_Pgroup_get_default();
00090 GA_Pgroup_set_default(theGAgroup);
00091
00092
00093
00094 ThingVector tvect(locthings);
00095 locthings.clear();
00096
00097
00098
00099 std::vector< ThingVector > tosend(processor_size());
00100
00101
00102
00103
00104 typename ThingVector::iterator t(tvect.begin());
00105 typename IndexVector::const_iterator dest(destproc.begin());
00106 for (; dest != destproc.end(); ++dest, ++t) {
00107 if (*dest == processor_rank()) {
00108 locthings.push_back(*t);
00109 } else {
00110 tosend[*dest].push_back(*t);
00111 }
00112 }
00113
00114 for (int p = 0; p < nproc; ++p) {
00115 if (p != me) {
00116 std::ostringstream ostr(std::ios::binary);
00117 oArchive oarch(ostr);
00118 oarch & tosend[p];
00119 int buflen = MA_sizeof(MT_CHAR, ostr.str().length(), p_gaType);
00120 p_gaBuffers[p].resize(buflen);
00121 memcpy(&(p_gaBuffers[p][0]),
00122 ostr.str().c_str(),
00123 ostr.str().length()*sizeof(char));
00124 p_gaBufferSize[p] = p_gaBuffers[p].size();
00125 }
00126 }
00127
00128 size_t lmaxsize(0), thesize;
00129 lmaxsize = *std::max_element(p_gaBufferSize.begin(),
00130 p_gaBufferSize.end());
00131 boost::mpi::all_reduce(communicator(), lmaxsize, thesize,
00132 boost::mpi::maximum<size_t>());
00133
00134 int dims[2] = { nproc, static_cast<int>(thesize) + 1 };
00135 int lo[2], hi[2], ld[2] = {1, 1};
00136 boost::scoped_ptr<GA::GlobalArray>
00137 ga(new GA::GlobalArray(p_gaType, 2, dims, "serialized data", NULL)),
00138 gasize(new GA::GlobalArray(MT_C_INT, 1, dims, "serialized data sizes", NULL));
00139
00140 for (int src = 0; src < nproc; ++src) {
00141 if (me == src) {
00142 lo[0] = 0, hi[0] = nproc - 1;
00143 gasize->put(&lo[0], &hi[0], &(p_gaBufferSize[0]), &ld[0]);
00144 for (int p = 0; p < nproc; ++p) {
00145 if (p != me) {
00146 std::cerr << "gaShuffler: "
00147 << me << ": sending " << p_gaBufferSize[p]
00148 << " integers to process " << p
00149 << std::endl;
00150 lo[0] = hi[0] = p;
00151 lo[1] = 0; hi[1] = p_gaBufferSize[p] - 1;
00152 ga->put(&lo[0], &hi[0], &(p_gaBuffers[p][0]), &ld[0]);
00153 }
00154 }
00155 }
00156 communicator().sync();
00157 if (me != src) {
00158 int mybufsize;
00159 lo[0] = hi[0] = me;
00160 gasize->get(&lo[0], &hi[0], &mybufsize, &ld[0]);
00161 lo[1] = 0; hi[1] = mybufsize - 1;
00162 p_gaBuffers[me].clear();
00163 p_gaBuffers[me].resize(mybufsize);
00164 std::cerr << "gaShuffler: "
00165 << me << ": getting " << mybufsize
00166 << " integers from process " << src
00167 << std::endl;
00168 ga->get(&lo[0], &hi[0], &(p_gaBuffers[me][0]), &ld[1]);
00169 int buflen(p_gaBuffers[me].size());
00170 std::string s((char *)(&(p_gaBuffers[me][0])),
00171 buflen*sizeof(gaBufferType::value_type));
00172 p_gaBuffers[me].clear();
00173 std::istringstream is(s, std::ios::binary);
00174 iArchive iarch(is);
00175 ThingVector tmp;
00176 iarch & tmp;
00177 std::copy(tmp.begin(), tmp.end(), std::back_inserter(locthings));
00178 }
00179 communicator().sync();
00180 }
00181
00182 ga.reset();
00183 gasize.reset();
00184
00185 p_reset();
00186
00187 GA_Pgroup_set_default(oldGAgroup);
00188 }
00189
00190 private:
00191
00192
00193 static const int p_gaType = MT_C_INT;
00194
00195
00196 typedef std::vector<int> gaBufferType;
00197
00198
00199 std::vector<gaBufferType> p_gaBuffers;
00200
00201
00202 std::vector<int> p_gaBufferSize;
00203
00204
00205 void p_reset(void)
00206 {
00207 for (size_t p = 0; p < p_gaBuffers.size(); ++p) {
00208 p_gaBuffers[p].clear();
00209 p_gaBufferSize[p] = 0;
00210 }
00211 }
00212 };
00213
00214 }
00215 }
00216
00217 #endif